package com.fanjj.spark.spark.session

import java.util
import java.util.{ArrayList, Iterator, List}

import com.alibaba.fastjson
import com.alibaba.fastjson.JSONObject
import com.fanjj.spark.conf.ConfigurationManager
import com.fanjj.spark.constant.Constants
import com.fanjj.spark.entity.TaskEntity
import com.fanjj.spark.services.TaskService
import com.fanjj.spark.services.impl.TaskServiceImpl
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.hive.HiveContext
import com.fanjj.spark.{SparkApplication, test}
import com.fanjj.spark.test.MockData
import com.fanjj.spark.util.{ParamUtils, SparkUtil}
import org.apache.spark.api.java.function.PairFlatMapFunction
import org.apache.spark.storage.StorageLevel
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.SpringApplication
import org.springframework.context.ConfigurableApplicationContext

object  UserVisitSessionAnalyzeSpark {





  def main(args: Array[String]): Unit = {

    val context = SpringApplication.run(classOf[SparkApplication])


    val conf = new SparkConf().setAppName("CustomSort5").setMaster("local[*]")
   // setMaster(conf)
    val sc = new SparkContext(conf)
    val sqlContext = getSQLContext(sc)
    //创建模拟数据
    SparkUtil.mockData(sc,sqlContext)


    val taskService = context.getBean(classOf[TaskService])
    val tasks = taskService.list()
    val task: TaskEntity = tasks.get(0)
    //
    val actionRDD :RDD[(String,Row)]= SparkUtil.getActionRDDByDateRange(sqlContext,task)
    actionRDD.persist(StorageLevel.MEMORY_ONLY)
    println(actionRDD.take(10).toBuffer)
  }

  /**
    * 根据当前是否本地测试的配置
    * 决定，如何设置SparkConf的master
    */
  def setMaster(conf: SparkConf): Unit = {
    val local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)
    if (local) conf.setMaster("local[*]")

  }


  /**
    * 获取SQLContext
    * 如果spark.local设置为true，那么就创建SQLContext；否则，创建HiveContext
    *
    * @param sc
    * @return
    */
  def getSQLContext(sc: SparkContext): SQLContext = {
    val local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)
    if (local) new SQLContext(sc)
    else new HiveContext(sc)
  }

  /**
    * 生成模拟数据
    * 如果spark.local配置设置为true，则生成模拟数据；否则不生成
    *
    * @param sc
    * @param sqlContext
    */
  def mockData(sc: JavaSparkContext, sqlContext: SQLContext): Unit = {
    val local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)
    if (local.equals("spark.local")) MockData.mock(sc, sqlContext)
  }

  /**
    * 获取指定日期范围内的用户行为数据RDD
    *
    * @param sqlContext
    * @param taskParam
    * @return
    */
  def  getActionRDDByDateRange ( sqlContext:SQLContext, taskParam: JSONObject) : RDD [Row] ={


      val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
      val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)

      val sql =
        "select *  from user_visit_action  where date>='" + startDate + "'  and date<='" + endDate + "'"
      //				+ "and session_id not in('','','')"

   val actionDF:DataFrame  = sqlContext.sql(sql)

      /**
        * 这里就很有可能发生上面说的问题
        * 比如说，Spark SQl默认就给第一个stage设置了20个task，但是根据你的数据量以及算法的复杂度
        * 实际上，你需要1000个task去并行执行
        *
        * 所以说，在这里，就可以对Spark SQL刚刚查询出来的RDD执行repartition重分区操作
        */

      //		return actionDF.javaRDD().repartition(1000);

      return actionDF.toJavaRDD
    }
  }

